package com.atguigu.loginfail_detect

import java.util

import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * @author: yangShen
  * @Description: 采用CEP分析登录失败
  * @Date: 2020/5/8 14:33 
  */
object LoginFailWithCep {
  def main(args: Array[String]): Unit = {

    val environment = StreamExecutionEnvironment.getExecutionEnvironment
    environment.setParallelism(1)
    environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //1.读取事件数据，创建简单事件流
    val source = getClass.getResource("/LoginLog.csv")
    val loginEventStream = environment.readTextFile(source.getPath)
      .map(data => {
        val dataArray = data.split(",")
        LoginEvent(dataArray(0).trim.toLong, dataArray(1).trim, dataArray(2).trim, dataArray(3).trim.toLong)
      })
      //设置时间戳
      //方式四：处理乱序数据，水位线waterMark设置为5毫秒，延时5毫秒上涨水位
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(5)) {
      override def extractTimestamp(element: LoginEvent): Long = element.eventTime * 1000L
    })
      .keyBy(_.userId)

    //2.定义匹配模式
    val loginFailPattern = Pattern.begin[LoginEvent]("begin").where(_.eventType == "fail")
      //.next: 紧密集合,相当于数据库的and
      .next("next").where(_.eventType == "fail")
      .within(Time.seconds(2))

    //3.在事件流上应用模式，得到一个pattern stream    --参数(输入流, 匹配模式)
    val patternStream = CEP.pattern(loginEventStream, loginFailPattern)

    //4.从pattern stream 上应用select function , 检出匹配事件序列
    val loginFailDataStream = patternStream.select(new LoginFailMatch())

    loginFailDataStream.print()

    environment.execute("login fail with cep job")
  }
}

class LoginFailMatch() extends PatternSelectFunction[LoginEvent, Warning]{
  override def select(map: util.Map[String, util.List[LoginEvent]]): Warning = {
    val firstFail = map.get("begin").iterator().next()
    val lastFail = map.get("next").iterator().next()
    Warning(firstFail.userId, firstFail.eventTime, lastFail.eventTime, "login fail")
  }
}